package com.atguigu.wc.source;

import com.atguigu.wc.util.ResourceUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

/**
 * @Author:GaoFei
 * @Description:
 * @Date:Created in 18:11
 * @Modified By:
 */
public class SourceKafka {
    private static Properties load = ResourceUtils.load("application.properties");
    private static String brokerList = load.getProperty("kafka.broker.list");


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties prop = new Properties();
        prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"flink");
        DataStream<String> sensor = env.addSource(new FlinkKafkaConsumer011<>("sensor", new SimpleStringSchema(), prop));
        sensor.print();

        env.execute();
    }
}
